<?php

declare(strict_types=1);

namespace MnsClient;

use AliyunMNS\Config;
use AliyunMNS\Exception\MnsException;
use AliyunMNS\Responses\ReceiveMessageResponse;
use AliyunMNS\Traits\MessagePropertiesForPeek;
use Hyperf\Context\ApplicationContext;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Process\Annotation\Process;
use Hyperf\Process\ProcessManager;
use Hyperfx\Framework\Exception\AppException;
use Hyperfx\Framework\Logger\Logx;
use Hyperfx\Utils\AppEnvUtil;
use Hyperfx\Utils\RunModeUtil;
use Psr\Container\ContainerInterface;
use Hyperf\Process\AbstractProcess;


abstract class MnsAbstractProcess extends AbstractProcess {

    protected array $config;

    protected MnsClient $client;

    protected int $restartInterval = 1;

    /* @param $payload Message */
    abstract protected function consume($payload): array;

    public function __construct(ContainerInterface $container)
    {
        parent::__construct($container);
        $objConfig = $container->get(ConfigInterface::class);
        $key = sprintf('mns_queue.%s', $this->queue);
        $this->config = $objConfig->get($key);

        $this->name = "mns-queue.{$this->queue}";
        $this->nums = $this->config['processes'] ?? 1;

        $this->client = ApplicationContext::getContainer()->get(MnsFactory::class)->get();
    }

    public function handle(): void
    {
        $messageCount = 0;
        $maxMessages = $this->config['max_messages'];

        while (ProcessManager::isRunning()) {
            try {
                $traceId = Logx::nextTraceId();
                if ($maxMessages > 0 && $messageCount >= $maxMessages) {
                    break;
                }

                $message = $this->pop();
                if (false === $message) {
                    continue;
                }

                $callback = parallel([function() use ($traceId, $message) {
                    Logx::nextTraceId($traceId);
                    return $this->consume($message);
                }]);

                $status = $callback[0][0] ?? '';

                switch ($status) {
                    // 成功
                    case Result::ACK: {
                        $this->delete($message->getReceiptHandle());
                        break;
                    }
                    // 失败
                    case Result::NACK: {
                        Logx::get()->error('处理消息失败', [
                            'body' => $message->getMessageBody(),
                            'message_id' => $message->getMessageId(),
                            'dequeue_count' => $message->getDequeueCount()
                        ]);
                        break;
                    }
                    // 重试
                    case Result::REQUEUE: {
                        $visibilityTimeout = (int) $callback[0][1];
                        $this->retry($message->getReceiptHandle(), $visibilityTimeout);
                        break;
                    }
                    default:{
                        Logx::get()->alert('返回状态错误', [
                            'body' => $message->getMessageBody(),
                            'message_id' => $message->getMessageId(),
                            'dequeue_count' => $message->getDequeueCount()
                        ]);
                    }
                }

            } catch (\Exception $e) {

                Logx::get()->alert('消息处理失败', [
                    'message' => $e->getMessage(),
                    'line' => $e->getLine(),
                    'code' => $e->getCode(),
                    'trace' => $e->getTraceAsString()
                ]);
                continue;

            } finally {
                ++$messageCount;
            }
        }
    }

    private function pop(): ReceiveMessageResponse|bool {
        return $this->client->receiveMessage($this->config['queue_name'], $this->config['wait_seconds'] ?: 5, false, true);
    }

    protected function delete($receiptHandle): bool {
        return $this->client->deleteMessage($this->config['queue_name'], $receiptHandle);
    }

    protected function retry($receiptHandle, int $visibilityTimeout): bool {
        return $this->client->changeMessageVisibility($this->config['queue_name'], $receiptHandle, $visibilityTimeout);
    }
}